package com.atguigu.handler

import com.atguigu.bean.StartUpLog
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import redis.clients.jedis.Jedis

import java.text.SimpleDateFormat
import java.util
import java.util.Date

object DauHandler {
  // 批次内去重
  def filterbyGroup(filterByRedisDStream: DStream[StartUpLog]) = {
    val value = filterByRedisDStream.mapPartitions(partition => {
      partition.map(data => {
        // 转化数据类型
        ((data.mid, data.logDate), data)
      })
    }).groupByKey()
      .mapValues(iter => {
        iter.toList.sortWith(_.ts < _.ts).take(1)
      })
      .flatMap(_._2)
    value
  }

  // 批次间去重
  def filterByRedis(startUpLogDStream: DStream[StartUpLog], sc: SparkContext) = {
    // 方案一:每一条数据创建一个redis连接
//    val value = startUpLogDStream.filter(data => {
//      val jedis = new Jedis("hadoop102", 6379)
//
//      val redisKey = "DAU:" + data.logDate
//      val boolean = jedis.sismember(redisKey, data.mid)
//
//      jedis.close()
//      !boolean
//    })
//    value

    // 方案二：每一个分区的数据创建一个连接
//    val value = startUpLogDStream.mapPartitions(partition => {
//      val jedis = new Jedis("hadoop102", 6379)
//
//      val logs = partition.filter(data => {
//        val redisKey = "DAU:" + data.logDate
//        val boolean = jedis.sismember(redisKey, data.mid)
//
//        !boolean
//      })
//
//      jedis.close()
//      logs
//    })
//    value

    // 方案三：
//    val sdf = new SimpleDateFormat("yyyy-MM-dd")
//    val value: DStream[StartUpLog] = startUpLogDStream.transform(rdd => {
//      val jedis = new Jedis("hadoop102", 6379)
//      val rediskey = "DAU:" + sdf.format(new Date(System.currentTimeMillis()))
//      val mids: util.Set[String] = jedis.smembers(rediskey)
//
//      val midsBC = sc.broadcast(mids)
//
//      val midFilterRDD: RDD[StartUpLog] = rdd.filter(data => {
//        !midsBC.value.contains(data.mid)
//      })
//
//      jedis.close()
//      midFilterRDD
//    })
//    value

    // 方案四：
    val value: DStream[StartUpLog] = startUpLogDStream.mapPartitions(partition => {
      val jedis = new Jedis("hadoop102", 6379)

      val logs: Iterator[StartUpLog] = partition.filter((data: StartUpLog) => {
        val redisKey = "DAU:" + data.logDate
        val boolean = jedis.sismember(redisKey, data.mid)

        if(!boolean) {
          jedis.sadd(redisKey, data.mid)
        }

        !boolean
      })

      jedis.close()
      logs
    })
    value
  }


  // 将数据保存到redis
  def saveMidToRedis(startUpLogDStream: DStream[StartUpLog]) = {

    startUpLogDStream.foreachRDD(rdd => {
      rdd.foreachPartition(partition => {
        // 创建连接
        val jedis = new Jedis("hadoop102", 6379)

        // 写入数据
        partition.foreach(data => {
          val redisKey = "DAU:" + data.logDate
          jedis.sadd(redisKey, data.mid)
        })

        // 关闭连接
        jedis.close()
      })
    })

  }

}
